package io.cess.mqtt.demo.config;

import cn.hutool.core.thread.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

/**
 * @author wcl
 * @version 1.0
 * @date 2019/12/14 5:50 下午
 */
@Primary
@Component
public class MqttCallback implements MqttCallbackExtended {

    @Autowired
    MqttClientBean mqttClientBean;
    @Autowired
    MqttConfig mqttConfig;
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        /**
         * 客户端连接成功后就需要尽快订阅需要的 topic
         */
        System.out.println("connect success");

        ExecutorService mqttExecutorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttExecutorService.submit(() -> {
            try {
                //订阅主题，主主题后面可以跟子主题 过滤规则 +:过滤一级 ，#:过滤所有
                final String[] topicFilter = {mqttConfig.getTopicId() + "/" + "testMq4Iot"};
                int qosLevel=0;
                final int[] qos = {qosLevel};
                MqttClient mqttClient = mqttClientBean.getMqttClient();
                mqttClient.subscribe(topicFilter, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public void connectionLost(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        /**
         * 消费消息的回调接口，需要确保该接口不抛异常，该接口运行返回即代表消息消费成功。
         * 消费消息需要保证在规定时间内完成，如果消费耗时超过服务端约定的超时时间，对于可靠传输的模式，服务端可能会重试推送，业务需要做好幂等去重处理。超时时间约定参考限制
         * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
         */
        System.out.println(
                "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
    }
}
